package com.atguigu.flink.datastreamapi.agg;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.util.HashMap;
import java.util.Map;

/**
 * Created by Smexy on 2023/11/11
 */
public class Demo6_ProcessKeyAgg
{
    public static void main(String[] args) {

        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 3333);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);

        env.setParallelism(1);

        KeyedStream<WaterSensor, String> stream = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction())
            //聚合前先分组
            .keyBy(WaterSensor::getId);

        /*
            KeyedProcessFunction<KEY, T, R>: 处理函数，多提供了一个key
         */
        stream
            //聚合每一种传感器的vc之和
            .process(new KeyedProcessFunction<String, WaterSensor, String>()
            {
                /*
                    当前Task，可能会收到多个Key。在累积之前，先去判断，每一种key都只累加到对应的sumVc

                    s1--->s1sumVc
                    s3--->s3sumVc
                 */
                private Map<String,Integer> map = new HashMap<>();
                @Override
                public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                    String key = ctx.getCurrentKey();
                    Integer lastSumVc = map.getOrDefault(key, 0);
                    lastSumVc += value.getVc();
                    map.put(key,lastSumVc);
                    out.collect("当前传感器"+key+"的vc之和:"+ lastSumVc);
                }
            })
            .print();

                try {
                            env.execute();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    }
}
